package org.idea.eaglemq.broker.netty.broker;

import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import org.idea.eaglemq.broker.event.model.ConsumeMsgEvent;
import org.idea.eaglemq.broker.event.model.PushMsgEvent;
import org.idea.eaglemq.common.coder.TcpMsg;
import org.idea.eaglemq.common.dto.ConsumeMsgReqDTO;
import org.idea.eaglemq.common.dto.MessageDTO;
import org.idea.eaglemq.common.enums.BrokerEventCode;
import org.idea.eaglemq.common.event.EventBus;
import org.idea.eaglemq.common.event.model.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;


@ChannelHandler.Sharable
public class BrokerServerHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = LoggerFactory.getLogger(BrokerServerHandler.class);

    private EventBus eventBus;

    public BrokerServerHandler(EventBus eventBus) {
        this.eventBus = eventBus;
        this.eventBus.init();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        TcpMsg tcpMsg = (TcpMsg) msg;
        int code = tcpMsg.getCode();
        byte[] body = tcpMsg.getBody();
        Event event = null;
        if (BrokerEventCode.PUSH_MSG.getCode() == code) {
            MessageDTO messageDTO = JSON.parseObject(body, MessageDTO.class);
            PushMsgEvent pushMsgEvent = new PushMsgEvent();
            pushMsgEvent.setMessageDTO(messageDTO);
            logger.info("收到消息推送内容:{},message is {}", new String(messageDTO.getBody()), JSON.toJSONString(messageDTO));
            event = pushMsgEvent;
        } else if (BrokerEventCode.CONSUME_MSG.getCode() == code) {
            ConsumeMsgReqDTO consumeMsgReqDTO = JSON.parseObject(body, ConsumeMsgReqDTO.class);
            InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
            consumeMsgReqDTO.setIp(inetSocketAddress.getHostString());
            consumeMsgReqDTO.setPort(inetSocketAddress.getPort());
            ConsumeMsgEvent consumeMsgEvent = new ConsumeMsgEvent();
            consumeMsgEvent.setConsumeMsgReqDTO(consumeMsgReqDTO);
            consumeMsgEvent.setMsgId(consumeMsgReqDTO.getMsgId());
            event = consumeMsgEvent;
        }
        event.setChannelHandlerContext(channelHandlerContext);
        eventBus.publish(event);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("new connection build");
    }
}

